import os
import sys
from io import (BytesIO, UnsupportedOperation)
import threading
import warnings

import numpy as np
from numpy.testing import (assert_equal, assert_, assert_array_equal,
                           break_cycles, IS_PYPY)
import pytest
from pytest import raises, warns

from scipy.io import wavfile


def datafile(fn):
    return os.path.join(os.path.dirname(__file__), 'data', fn)


def test_read_1():
    # 32-bit PCM (which uses extensible format)
    for mmap in [False, True]:
        filename = 'test-44100Hz-le-1ch-4bytes.wav'
        rate, data = wavfile.read(datafile(filename), mmap=mmap)

        assert_equal(rate, 44100)
        assert_(np.issubdtype(data.dtype, np.int32))
        assert_equal(data.shape, (4410,))

        del data


def test_read_2():
    # 8-bit unsigned PCM
    for mmap in [False, True]:
        filename = 'test-8000Hz-le-2ch-1byteu.wav'
        rate, data = wavfile.read(datafile(filename), mmap=mmap)

        assert_equal(rate, 8000)
        assert_(np.issubdtype(data.dtype, np.uint8))
        assert_equal(data.shape, (800, 2))

        del data


def test_read_3():
    # Little-endian float
    for mmap in [False, True]:
        filename = 'test-44100Hz-2ch-32bit-float-le.wav'
        rate, data = wavfile.read(datafile(filename), mmap=mmap)

        assert_equal(rate, 44100)
        assert_(np.issubdtype(data.dtype, np.float32))
        assert_equal(data.shape, (441, 2))

        del data


def test_read_4():
    # Contains unsupported 'PEAK' chunk
    for mmap in [False, True]:
        with warnings.catch_warnings():
            warnings.filterwarnings(
                "ignore",
                "Chunk .non-data. not understood, skipping it",
                wavfile.WavFileWarning
            )
            filename = 'test-48000Hz-2ch-64bit-float-le-wavex.wav'
            rate, data = wavfile.read(datafile(filename), mmap=mmap)

        assert_equal(rate, 48000)
        assert_(np.issubdtype(data.dtype, np.float64))
        assert_equal(data.shape, (480, 2))

        del data


def test_read_5():
    # Big-endian float
    for mmap in [False, True]:
        filename = 'test-44100Hz-2ch-32bit-float-be.wav'
        rate, data = wavfile.read(datafile(filename), mmap=mmap)

        assert_equal(rate, 44100)
        assert_(np.issubdtype(data.dtype, np.float32))
        assert_(data.dtype.byteorder == '>' or (sys.byteorder == 'big' and
                                                data.dtype.byteorder == '='))
        assert_equal(data.shape, (441, 2))

        del data


def test_5_bit_odd_size_no_pad():
    # 5-bit, 1 B container, 5 channels, 9 samples, 45 B data chunk
    # Generated by LTspice, which incorrectly omits pad byte, but should be
    # readable anyway
    for mmap in [False, True]:
        filename = 'test-8000Hz-le-5ch-9S-5bit.wav'
        rate, data = wavfile.read(datafile(filename), mmap=mmap)

        assert_equal(rate, 8000)
        assert_(np.issubdtype(data.dtype, np.uint8))
        assert_equal(data.shape, (9, 5))

        # 8-5 = 3 LSBits should be 0
        assert_equal(data & 0b00000111, 0)

        # Unsigned
        assert_equal(data.max(), 0b11111000)  # Highest possible
        assert_equal(data[0, 0], 128)  # Midpoint is 128 for <= 8-bit
        assert_equal(data.min(), 0)  # Lowest possible

        del data


def test_12_bit_even_size():
    # 12-bit, 2 B container, 4 channels, 9 samples, 72 B data chunk
    # Generated by LTspice from 1 Vpk sine waves
    for mmap in [False, True]:
        filename = 'test-8000Hz-le-4ch-9S-12bit.wav'
        rate, data = wavfile.read(datafile(filename), mmap=mmap)

        assert_equal(rate, 8000)
        assert_(np.issubdtype(data.dtype, np.int16))
        assert_equal(data.shape, (9, 4))

        # 16-12 = 4 LSBits should be 0
        assert_equal(data & 0b00000000_00001111, 0)

        # Signed
        assert_equal(data.max(), 0b01111111_11110000)  # Highest possible
        assert_equal(data[0, 0], 0)  # Midpoint is 0 for >= 9-bit
        assert_equal(data.min(), -0b10000000_00000000)  # Lowest possible

        del data


def test_24_bit_odd_size_with_pad():
    # 24-bit, 3 B container, 3 channels, 5 samples, 45 B data chunk
    # Should not raise any warnings about the data chunk pad byte
    filename = 'test-8000Hz-le-3ch-5S-24bit.wav'
    rate, data = wavfile.read(datafile(filename), mmap=False)

    assert_equal(rate, 8000)
    assert_(np.issubdtype(data.dtype, np.int32))
    assert_equal(data.shape, (5, 3))

    # All LSBytes should be 0
    assert_equal(data & 0xff, 0)

    # Hand-made max/min samples under different conventions:
    #                      2**(N-1)     2**(N-1)-1     LSB
    assert_equal(data, [[-0x8000_0000, -0x7fff_ff00, -0x200],
                        [-0x4000_0000, -0x3fff_ff00, -0x100],
                        [+0x0000_0000, +0x0000_0000, +0x000],
                        [+0x4000_0000, +0x3fff_ff00, +0x100],
                        [+0x7fff_ff00, +0x7fff_ff00, +0x200]])
    #                     ^ clipped


def test_20_bit_extra_data():
    # 20-bit, 3 B container, 1 channel, 10 samples, 30 B data chunk
    # with extra data filling container beyond the bit depth
    filename = 'test-1234Hz-le-1ch-10S-20bit-extra.wav'
    rate, data = wavfile.read(datafile(filename), mmap=False)

    assert_equal(rate, 1234)
    assert_(np.issubdtype(data.dtype, np.int32))
    assert_equal(data.shape, (10,))

    # All LSBytes should still be 0, because 3 B container in 4 B dtype
    assert_equal(data & 0xff, 0)

    # But it should load the data beyond 20 bits
    assert_((data & 0xf00).any())

    # Full-scale positive/negative samples, then being halved each time
    assert_equal(data, [+0x7ffff000,       # +full-scale 20-bit
                        -0x7ffff000,       # -full-scale 20-bit
                        +0x7ffff000 >> 1,  # +1/2
                        -0x7ffff000 >> 1,  # -1/2
                        +0x7ffff000 >> 2,  # +1/4
                        -0x7ffff000 >> 2,  # -1/4
                        +0x7ffff000 >> 3,  # +1/8
                        -0x7ffff000 >> 3,  # -1/8
                        +0x7ffff000 >> 4,  # +1/16
                        -0x7ffff000 >> 4,  # -1/16
                        ])


def test_36_bit_odd_size():
    # 36-bit, 5 B container, 3 channels, 5 samples, 75 B data chunk + pad
    filename = 'test-8000Hz-le-3ch-5S-36bit.wav'
    rate, data = wavfile.read(datafile(filename), mmap=False)

    assert_equal(rate, 8000)
    assert_(np.issubdtype(data.dtype, np.int64))
    assert_equal(data.shape, (5, 3))

    # 28 LSBits should be 0
    assert_equal(data & 0xfffffff, 0)

    # Hand-made max/min samples under different conventions:
    #            Fixed-point 2**(N-1)    Full-scale 2**(N-1)-1       LSB
    correct = [[-0x8000_0000_0000_0000, -0x7fff_ffff_f000_0000, -0x2000_0000],
               [-0x4000_0000_0000_0000, -0x3fff_ffff_f000_0000, -0x1000_0000],
               [+0x0000_0000_0000_0000, +0x0000_0000_0000_0000, +0x0000_0000],
               [+0x4000_0000_0000_0000, +0x3fff_ffff_f000_0000, +0x1000_0000],
               [+0x7fff_ffff_f000_0000, +0x7fff_ffff_f000_0000, +0x2000_0000]]
    #              ^ clipped

    assert_equal(data, correct)


def test_45_bit_even_size():
    # 45-bit, 6 B container, 3 channels, 5 samples, 90 B data chunk
    filename = 'test-8000Hz-le-3ch-5S-45bit.wav'
    rate, data = wavfile.read(datafile(filename), mmap=False)

    assert_equal(rate, 8000)
    assert_(np.issubdtype(data.dtype, np.int64))
    assert_equal(data.shape, (5, 3))

    # 19 LSBits should be 0
    assert_equal(data & 0x7ffff, 0)

    # Hand-made max/min samples under different conventions:
    #            Fixed-point 2**(N-1)    Full-scale 2**(N-1)-1      LSB
    correct = [[-0x8000_0000_0000_0000, -0x7fff_ffff_fff8_0000, -0x10_0000],
               [-0x4000_0000_0000_0000, -0x3fff_ffff_fff8_0000, -0x08_0000],
               [+0x0000_0000_0000_0000, +0x0000_0000_0000_0000, +0x00_0000],
               [+0x4000_0000_0000_0000, +0x3fff_ffff_fff8_0000, +0x08_0000],
               [+0x7fff_ffff_fff8_0000, +0x7fff_ffff_fff8_0000, +0x10_0000]]
    #              ^ clipped

    assert_equal(data, correct)


def test_53_bit_odd_size():
    # 53-bit, 7 B container, 3 channels, 5 samples, 105 B data chunk + pad
    filename = 'test-8000Hz-le-3ch-5S-53bit.wav'
    rate, data = wavfile.read(datafile(filename), mmap=False)

    assert_equal(rate, 8000)
    assert_(np.issubdtype(data.dtype, np.int64))
    assert_equal(data.shape, (5, 3))

    # 11 LSBits should be 0
    assert_equal(data & 0x7ff, 0)

    # Hand-made max/min samples under different conventions:
    #            Fixed-point 2**(N-1)    Full-scale 2**(N-1)-1    LSB
    correct = [[-0x8000_0000_0000_0000, -0x7fff_ffff_ffff_f800, -0x1000],
               [-0x4000_0000_0000_0000, -0x3fff_ffff_ffff_f800, -0x0800],
               [+0x0000_0000_0000_0000, +0x0000_0000_0000_0000, +0x0000],
               [+0x4000_0000_0000_0000, +0x3fff_ffff_ffff_f800, +0x0800],
               [+0x7fff_ffff_ffff_f800, +0x7fff_ffff_ffff_f800, +0x1000]]
    #              ^ clipped

    assert_equal(data, correct)


def test_64_bit_even_size():
    # 64-bit, 8 B container, 3 channels, 5 samples, 120 B data chunk
    for mmap in [False, True]:
        filename = 'test-8000Hz-le-3ch-5S-64bit.wav'
        rate, data = wavfile.read(datafile(filename), mmap=mmap)

        assert_equal(rate, 8000)
        assert_(np.issubdtype(data.dtype, np.int64))
        assert_equal(data.shape, (5, 3))

        # Hand-made max/min samples under different conventions:
        #            Fixed-point 2**(N-1)    Full-scale 2**(N-1)-1   LSB
        correct = [[-0x8000_0000_0000_0000, -0x7fff_ffff_ffff_ffff, -0x2],
                   [-0x4000_0000_0000_0000, -0x3fff_ffff_ffff_ffff, -0x1],
                   [+0x0000_0000_0000_0000, +0x0000_0000_0000_0000, +0x0],
                   [+0x4000_0000_0000_0000, +0x3fff_ffff_ffff_ffff, +0x1],
                   [+0x7fff_ffff_ffff_ffff, +0x7fff_ffff_ffff_ffff, +0x2]]
        #              ^ clipped

        assert_equal(data, correct)

        del data


def test_unsupported_mmap():
    # Test containers that cannot be mapped to numpy types
    for filename in {'test-8000Hz-le-3ch-5S-24bit.wav',
                     'test-8000Hz-le-3ch-5S-36bit.wav',
                     'test-8000Hz-le-3ch-5S-45bit.wav',
                     'test-8000Hz-le-3ch-5S-53bit.wav',
                     'test-1234Hz-le-1ch-10S-20bit-extra.wav'}:
        with raises(ValueError, match="mmap.*not compatible"):
            rate, data = wavfile.read(datafile(filename), mmap=True)


def test_rifx():
    # Compare equivalent RIFX and RIFF files
    for rifx, riff in {('test-44100Hz-be-1ch-4bytes.wav',
                        'test-44100Hz-le-1ch-4bytes.wav'),
                       ('test-8000Hz-be-3ch-5S-24bit.wav',
                        'test-8000Hz-le-3ch-5S-24bit.wav')}:
        rate1, data1 = wavfile.read(datafile(rifx), mmap=False)
        rate2, data2 = wavfile.read(datafile(riff), mmap=False)
        assert_equal(rate1, rate2)
        assert_equal(data1, data2)


def test_rf64():
    # Compare equivalent RF64 and RIFF files
    for rf64, riff in {('test-44100Hz-le-1ch-4bytes-rf64.wav',
                        'test-44100Hz-le-1ch-4bytes.wav'),
                       ('test-8000Hz-le-3ch-5S-24bit-rf64.wav',
                        'test-8000Hz-le-3ch-5S-24bit.wav')}:
        rate1, data1 = wavfile.read(datafile(rf64), mmap=False)
        rate2, data2 = wavfile.read(datafile(riff), mmap=False)
        assert_array_equal(rate1, rate2)
        assert_array_equal(data1, data2)


@pytest.mark.xslow
def test_write_roundtrip_rf64(tmpdir):
    dtype = np.dtype("<i8")
    tmpfile = str(tmpdir.join('temp.wav'))
    rate = 44100
    data = np.random.randint(0, 127, (2**29,)).astype(dtype)

    wavfile.write(tmpfile, rate, data)

    rate2, data2 = wavfile.read(tmpfile, mmap=True)

    assert_equal(rate, rate2)
    msg = f"{data2.dtype} byteorder not in ('<', '=', '|')"
    assert data2.dtype.byteorder in ('<', '=', '|'), msg
    assert_array_equal(data, data2)
    # also test writing (gh-12176)
    data2[0] = 0


# Fake a non-seekable file-like object without resorting to subprocesses.
class Nonseekable:
    def __init__(self, fp):
        self.fp = fp

    def seekable(self):
        return False
    
    def read(self, size=-1, /):
        return self.fp.read(size)
    
    def close(self):
        self.fp.close()


def test_streams():
    for filename in ['test-44100Hz-le-1ch-4bytes.wav',
                     'test-8000Hz-le-2ch-1byteu.wav',
                     'test-44100Hz-2ch-32bit-float-le.wav',
                     'test-44100Hz-2ch-32bit-float-be.wav',
                     'test-8000Hz-le-5ch-9S-5bit.wav',
                     'test-8000Hz-le-4ch-9S-12bit.wav',
                     'test-8000Hz-le-3ch-5S-24bit.wav',
                     'test-1234Hz-le-1ch-10S-20bit-extra.wav',
                     'test-8000Hz-le-3ch-5S-36bit.wav',
                     'test-8000Hz-le-3ch-5S-45bit.wav',
                     'test-8000Hz-le-3ch-5S-53bit.wav',
                     'test-8000Hz-le-3ch-5S-64bit.wav',
                     'test-44100Hz-be-1ch-4bytes.wav', # RIFX
                     'test-44100Hz-le-1ch-4bytes-rf64.wav']:
        dfname = datafile(filename)
        with open(dfname, 'rb') as fp1, open(dfname, 'rb') as fp2:
            rate1, data1 = wavfile.read(fp1)
            rate2, data2 = wavfile.read(Nonseekable(fp2))
            rate3, data3 = wavfile.read(dfname, mmap=False)
            assert_equal(rate1, rate3)
            assert_equal(rate2, rate3)
            assert_equal(data1, data3)
            assert_equal(data2, data3)


def test_read_unknown_filetype_fail():
    # Not an RIFF
    for mmap in [False, True]:
        filename = 'example_1.nc'
        with open(datafile(filename), 'rb') as fp:
            with raises(ValueError, match="CDF.*'RIFF', 'RIFX', and 'RF64' supported"):
                wavfile.read(fp, mmap=mmap)


def test_read_unknown_riff_form_type():
    # RIFF, but not WAVE form
    for mmap in [False, True]:
        filename = 'Transparent Busy.ani'
        with open(datafile(filename), 'rb') as fp:
            with raises(ValueError, match='Not a WAV file.*ACON'):
                wavfile.read(fp, mmap=mmap)


def test_read_unknown_wave_format():
    # RIFF and WAVE, but not supported format
    for mmap in [False, True]:
        filename = 'test-8000Hz-le-1ch-1byte-ulaw.wav'
        with open(datafile(filename), 'rb') as fp:
            with raises(ValueError, match='Unknown wave file format.*MULAW.*'
                        'Supported formats'):
                wavfile.read(fp, mmap=mmap)


def test_read_early_eof_with_data():
    # File ends inside 'data' chunk, but we keep incomplete data
    for mmap in [False, True]:
        filename = 'test-44100Hz-le-1ch-4bytes-early-eof.wav'
        with open(datafile(filename), 'rb') as fp:
            with warns(wavfile.WavFileWarning, match='Reached EOF'):
                rate, data = wavfile.read(fp, mmap=mmap)
                assert data.size > 0
                assert rate == 44100
                # also test writing (gh-12176)
                data[0] = 0


def test_read_early_eof():
    # File ends after 'fact' chunk at boundary, no data read
    for mmap in [False, True]:
        filename = 'test-44100Hz-le-1ch-4bytes-early-eof-no-data.wav'
        with open(datafile(filename), 'rb') as fp:
            with raises(ValueError, match="Unexpected end of file."):
                wavfile.read(fp, mmap=mmap)


def test_read_incomplete_chunk():
    # File ends inside 'fmt ' chunk ID, no data read
    for mmap in [False, True]:
        filename = 'test-44100Hz-le-1ch-4bytes-incomplete-chunk.wav'
        with open(datafile(filename), 'rb') as fp:
            with raises(ValueError, match="Incomplete chunk ID.*b'f'"):
                wavfile.read(fp, mmap=mmap)


def test_read_inconsistent_header():
    # File header's size fields contradict each other
    for mmap in [False, True]:
        filename = 'test-8000Hz-le-3ch-5S-24bit-inconsistent.wav'
        with open(datafile(filename), 'rb') as fp:
            with raises(ValueError, match="header is invalid"):
                wavfile.read(fp, mmap=mmap)


# signed 8-bit integer PCM is not allowed
# unsigned > 8-bit integer PCM is not allowed
# 8- or 16-bit float PCM is not expected
# g and q are platform-dependent, so not included
@pytest.mark.parametrize("dt_str", ["<i2", "<i4", "<i8", "<f4", "<f8",
                                    ">i2", ">i4", ">i8", ">f4", ">f8", '|u1'])
@pytest.mark.parametrize("channels", [1, 2, 5])
@pytest.mark.parametrize("rate", [8000, 32000])
@pytest.mark.parametrize("mmap", [False, True])
@pytest.mark.parametrize("realfile", [False, True])
def test_write_roundtrip(realfile, mmap, rate, channels, dt_str, tmpdir):
    dtype = np.dtype(dt_str)
    if realfile:
        tmpfile = str(tmpdir.join(str(threading.get_native_id()), 'temp.wav'))
        os.makedirs(os.path.dirname(tmpfile), exist_ok=True)
    else:
        tmpfile = BytesIO()
    data = np.random.rand(100, channels)
    if channels == 1:
        data = data[:, 0]
    if dtype.kind == 'f':
        # The range of the float type should be in [-1, 1]
        data = data.astype(dtype)
    else:
        data = (data*128).astype(dtype)

    wavfile.write(tmpfile, rate, data)

    rate2, data2 = wavfile.read(tmpfile, mmap=mmap)

    assert_equal(rate, rate2)
    assert_(data2.dtype.byteorder in ('<', '=', '|'), msg=data2.dtype)
    assert_array_equal(data, data2)
    # also test writing (gh-12176)
    if realfile:
        data2[0] = 0
    else:
        with pytest.raises(ValueError, match='read-only'):
            data2[0] = 0

    if realfile and mmap and IS_PYPY and sys.platform == 'win32':
        # windows cannot remove a dead file held by a mmap but not collected
        # in PyPy; since the filename gets reused in this test, clean this up
        break_cycles()
        break_cycles()


@pytest.mark.parametrize("dtype", [np.float16])
def test_wavfile_dtype_unsupported(tmpdir, dtype):
    tmpfile = str(tmpdir.join('temp.wav'))
    rng = np.random.default_rng(1234)
    data = rng.random((100, 5)).astype(dtype)
    rate = 8000
    with pytest.raises(ValueError, match="Unsupported"):
        wavfile.write(tmpfile, rate, data)

def test_seek_emulating_reader_invalid_seek():
    # Dummy data for the reader
    reader = wavfile.SeekEmulatingReader(BytesIO(b'\x00\x00'))
    
    # Test SEEK_END with an invalid whence value
    with pytest.raises(UnsupportedOperation):
        reader.seek(0, 5)  # Invalid whence value
    
    # Test with negative seek value
    with pytest.raises(UnsupportedOperation):
        reader.seek(-1, 0)  # Negative position with SEEK_SET
    
    # Test SEEK_END with valid parameters (should not raise)
    pos = reader.seek(0, os.SEEK_END)  # Valid usage
    assert pos == 2, f"Failed to seek to end, got position {pos}"
